Generic Message Queue implementation using AWS SQS and .Net 6 BackgroundService
Requirement
Most of the Enterprise projects we develop, we have to implement some cross-cutting concerns like Audit Logs. The important factor is that application performance should not impact because of these Audit Logs (or similar cross-cutting concerns). So how to implement this without compromising the performance?
BackgroundService
Background tasks and scheduled jobs are something you might need to use in any application, whether or not it follows the microservices architecture pattern. The difference when using a microservices architecture is that you can implement the background task in a separate process/container for hosting so you can scale it down/up based on your need.
From a generic point of view, in .NET we called these type of tasks Hosted Services, because they are services/logic that you host within your host/application/microservice. Note that in this case, the hosted service simply means a class with the background task logic.
In our sample we will be configuring the BackgroundService in the same Web API project for the sake of simplicity, but in the real production scenario you should consider a separate service. So BackgroundService can offload the Audit Log writing mechanism from the Web API. Now the challenge is how should we send the Audit Log objects to BackgroundService?
Message Queue
Message queues
allow different parts of a system to communicate and process operations asynchronously. A message queue provides a lightweight buffer which temporarily stores messages, and endpoints that allow software components to connect to the queue in order to send and receive messages. The messages are usually small, and can be things like requests, replies, error messages, or just plain information. To send a message, a component called a producer adds a message to the queue. The message is stored on the queue until another component called a consumer retrieves the message and does something with it.
There are different implementations of Message Queues exists, multiple cloud providers provide their own implementations. Here we’re using AWS SQS
Steps
Create an ASP.NET Core Web API
Install Dependencies
Create a folder Services
-> Contracts
and create a Generic Interface
called IMessageService
as follows:
public interface IMessageService<T>{ Task DeleteMessageAsync(string id); Task<Dictionary<string, T?>> ReceiveMessageAsync(int maxMessages = 1); Task SendMessage(T message);}
Now let’s create the SQS Message Service by implementing the above Interface
Let’s first create the Constructor
and this will create AWS SQS Client
. Before doing that we need to create an IAM
user with required permissions.
Create IAM user
- Go to the
AWS
console and searchIAM
. - Click on the
Users
panel on the left. - Click on the
Add User
button. - Provide a user name and select the
Access key - Programmatic access
checkbox and clickNext: Permissions
button. - Click on
Attach existing policies directly
tab - Search
AmazonSQSFullAccess
and select that policy. We would require FullAccess because we will be creating theQueue
programmatically if thatQueue
does not exist. - Click on
Next
button twice and finally click onCreate User
button. - Copy the
Access key ID
andSecret key ID
and store in a safe place.
Configure the AWS Credentials
There are multiple ways to configure the AWS credentials. I used AWS CLI
, which can be downloaded from here. Once it’s downloaded and installed in your machine follow the below command in Terminal or Command Prompt.
aws configure
The above command will access the following details from user and store in the ~/.aws/config
file. AWS SDK will fetch these credentials and create the clients.
AWS Access Key IDAWS Secret Access KeyDefault region name
Constructor code will look like the below:
public SqsGenericService(ILogger<SqsGenericService<T>> logger, IConfiguration configuration, IHostingEnvironment env){ _logger = logger; var options = c; //This queueName will be used to create the SQS Queue for each type of object in different environments var queueName = $"que-{env.EnvironmentName.ToLower()}-{typeof(T).Name.ToLower()}"; _amazonSQSClient = options.CreateServiceClient<IAmazonSQS>(); _queueUrl = GetQueueUrl(queueName).Result;}
Most of the code is self explanatory here. The configuration.GetAWSOptions()
will fetch the AWS configurations.
Dynamic Queue
creation for each environments and entities
queueName
variable will be created concatenating the environment name and the name of the generic entity. GetQueueUrl()
method will fetch the queue url if it already exists or else it will create a queue.
The next method is SendMessage
. This method will accept a Generic
message object and use AWS SQS Client
to push the serialized object to the Queue
.
public async Task SendMessage(T message){ var messageBody = JsonConvert.SerializeObject(message); await _amazonSQSClient.SendMessageAsync(new SendMessageRequest { QueueUrl = _queueUrl, MessageBody = messageBody }); _logger.LogInformation("Message {message} send successfully to {_queueUrl}.", message, _queueUrl);}
The next method is ReceiveMessageAsync
, this method will fetch the messages from queue and convert that to a Dictionary
of MessageReceiptHandle
and MessageBody
as the consumer of this service would require RecieptHandle
to delete the message after processing.
Worker Process
To implement the worker process, as mentioned earlier, decided to use BackgroundService
. Here is the complete code for the AuditLogWorker
class.
public class AuditLogWorker : BackgroundService { private readonly ILogger<AuditLogWorker> _logger; private readonly IMessageService<AuditLogModel> _messageClient; public AuditLogWorker(ILogger<AuditLogWorker> logger, IMessageService<AuditLogModel> messageClient) { _logger = logger; _messageClient = messageClient; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { _logger.LogInformation("AuditLogWorker running at: {Time}", DateTime.Now); var messages = await _messageClient.ReceiveMessageAsync(); foreach (var message in messages) { //You can write your custom logic here... _logger.LogInformation("AuditLogWorker processed message {userID}, {Action}", message.Value?.UserId, message.Value?.Message); await _messageClient.DeleteMessageAsync(message.Key); } await Task.Delay(5000, stoppingToken); //Delay can be set according to your business requirement. } }}
Modify the Program.cs
to add the necessary dependencies and configure the hosted service as below:
builder.Services.AddSingleton<IMessageService<AuditLogModel>, SqsGenericService<AuditLogModel>>();builder.Services.AddHostedService<AuditLogWorker>();
Create a simple REST API method to accept an object and push that item to the queue and test it. The testing method will look like the below:
[HttpPost] public async Task Post([FromBody] AuditLogModel model) { await _messageClient.SendMessage(model); _logger.LogInformation("Message pushed to the queue successfully."); }
That’s it folks, I hope everybody enjoyed the blog. The entire code can be downloaded from the GitHub repo
Happy coding!!!